-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29043][Core] Improve the concurrent performance of History Server #25797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ff7b9b9 to
fa89be3
Compare
|
ok to test cc @wangyum |
|
retest please |
|
ok to test |
|
cc @vanzin too |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code change looks OK, but I might be missing the effect where "listing" and "reloading applications" run concurrently. It would bring accessing some places via multi-threads which they weren't, so may need to check.
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
|
ok to test |
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
d939301 to
ec387f3
Compare
|
Test build #110636 has finished for PR 25797 at commit
|
|
gentle ping @dongjoon-hyun @HyukjinKwon |
|
I just modify the a method from protected to private in new commit, it would not impact the test result. |
|
Test build #111038 has finished for PR 25797 at commit
|
Ngone51
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable.
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
|
I just add some comment and rename a method, which would not impact the test result. |
|
Test build #111074 has finished for PR 25797 at commit
|
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
|
@cloud-fan Could you help take a look? |
|
Seems good to me. |
I have tested it for a week, it works well. |
|
Test build #113321 has finished for PR 25797 at commit
|
|
Test build #113328 has finished for PR 25797 at commit
|
|
Test build #113330 has finished for PR 25797 at commit
|
|
Test build #113332 has finished for PR 25797 at commit
|
| endProcessing(reader.rootPath) | ||
| pendingReplayTasksCount.decrementAndGet() | ||
|
|
||
| val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not right. Expiration is based on the time the log was last updated, not the time it was last scanned.
|
|
||
| val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() | ||
| if (isExpired) { | ||
| listing.delete(classOf[LogInfo], reader.rootPath.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may also need to remove the application attempt that refers to the log from the listing database.
Basically you have to do what cleanLogs does, both to define whether the log is expired, and what needs to be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
Test build #113379 has finished for PR 25797 at commit
|
|
retest this please. |
d5ff72c to
579c1ad
Compare
|
rebased. |
|
Test build #113625 has finished for PR 25797 at commit
|
|
Test build #113626 has finished for PR 25797 at commit
|
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
|
@turboFei Hi, could you address the review comments? This is good to have and seems close to be merged (according to #26416 (review) ). |
Thanks, I will address it as soon as possible. |
7d1301c to
c6ac35e
Compare
|
|
||
| log.appId.foreach { appId => | ||
| val app = listing.read(classOf[ApplicationInfoWrapper], appId) | ||
| if (app.oldestAttempt() <= maxTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the logic is consistent with cleanLogs().
But, I think there is an overlap between app.oldestAttempt() <= maxTime and attempt.info.lastUpdated.getTime() >= maxTime, even it does not matter.
|
Test build #115300 has finished for PR 25797 at commit
|
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
Show resolved
Hide resolved
|
Test build #115334 has finished for PR 25797 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok now. Merging to master.
|
|
||
| test("SPARK-29043: clean up specified event log") { | ||
| val clock = new ManualClock() | ||
| val conf = createTestConf().set(MAX_LOG_AGE_S.key, "0").set(CLEANER_ENABLED.key, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use .key here.
(I'll fix this during merge.)
What changes were proposed in this pull request?
Even we set spark.history.fs.numReplayThreads to a large number, such as 30.
The history server still replays logs slowly.
We found that, if there is a straggler in a batch of replay tasks, all the other threads will wait for this
straggler.
In this PR, we create processing to save the logs which are being replayed.
So that the replay tasks can execute Asynchronously.
Why are the changes needed?
It can accelerate the speed to replay logs for history server.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT.